iT邦幫忙

2025 iThome 鐵人賽

DAY 19
0
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 19

Day 19: 聚合算子 Part 1 - Aggregate 執行流程

  • 分享至 

  • xImage
  •  

前言

在昨天的文章中,我們探討了無狀態的 ProjectionExecFilterExec 算子。今天我們將學習需要狀態管理的聚合算子(AggregateExec)

考慮這個查詢:

SELECT department, COUNT(*), SUM(salary), AVG(age)
FROM employees
GROUP BY department;

聚合操作的特殊挑戰:

  1. 累積狀態:需要持續累加計數、總和等中間結果
  2. 分組管理:為每個 department 維護獨立的聚合狀態
  3. 記憶體控制:高基數分組(如 user_id)可能需要維護數百萬個狀態
  4. 分散式協調:在多個節點間進行部分聚合和最終合併

今天的學習目標:

  1. 理解 AggregateExec 的整體架構設計
  2. 掌握 Partial 和 Final 兩階段聚合的工作原理
  3. 了解 GROUP BY 的分組邏輯實現
  4. 認識 Accumulator(累積器)的核心概念

AggregateExec 的整體設計

核心資料結構

// datafusion/physical-plan/src/aggregates/mod.rs
#[derive(Debug, Clone)]
pub struct AggregateExec {
    /// 聚合模式(Partial、Final、Single 等)
    mode: AggregateMode,
    
    /// GROUP BY 表達式
    group_by: PhysicalGroupBy,
    
    /// 聚合函數表達式(如 SUM、COUNT、AVG)
    aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
    
    /// FILTER 子句(條件聚合)
    filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
    
    /// 輸入執行計劃
    input: Arc<dyn ExecutionPlan>,
    
    /// 輸出 Schema
    schema: SchemaRef,
    // ... 其他欄位
}

關鍵欄位說明:

1. AggregateMode - 聚合模式

pub enum AggregateMode {
    /// 部分聚合:第一階段,可並行執行
    Partial,
    /// 最終聚合:第二階段,合併部分聚合結果
    Final,
    /// 單階段聚合:在單一算子中完成全部聚合
    Single,
    // ... 其他變體
}

2. PhysicalGroupBy - 分組表達式

對於 GROUP BY department

  • expr 包含:[(Column("department"), "department")]
  • 對於多欄位分組,可以包含多個表達式

3. aggr_expr - 聚合函數表達式

每個 AggregateFunctionExpr 代表一個聚合函數,如 SUM(salary)COUNT(*)

三種執行策略

fn execute_typed(&self, partition: usize, context: Arc<TaskContext>) -> Result<StreamType> {
    // 情況 1: 沒有 GROUP BY(全局聚合)
    if self.group_by.expr.is_empty() {
        return Ok(StreamType::AggregateStream(...));
    }

    // 情況 2: 有 GROUP BY + LIMIT(Top-K 優化)
    if let Some(limit) = self.limit {
        return Ok(StreamType::GroupedPriorityQueue(...));
    }

    // 情況 3: 普通的 GROUP BY(Hash 聚合)
    Ok(StreamType::GroupedHash(...))
}

Partial 和 Final 兩階段聚合

為何需要兩階段聚合?

考慮分散式場景:數據分佈在 3 個節點,每個節點 1000 萬行。

單階段方案(效率低)

  • 網路傳輸 3000 萬行原始資料(數 GB)
  • 協調節點成為瓶頸

兩階段方案(高效)

第一階段 (Partial):
  節點 1: 1000萬行 → 本地聚合 → 1000 個分組結果
  節點 2: 1000萬行 → 本地聚合 → 1000 個分組結果
  節點 3: 1000萬行 → 本地聚合 → 1000 個分組結果

第二階段 (Final):
  協調節點: 合併 3000 個部分結果 → 1000 個最終分組

優勢:
- 網路傳輸量減少到幾 MB(500 倍以上)
- 充分利用各節點的並行計算能力

Partial 模式 - 產生中間狀態

AVG(salary) 為例:

原始數據:
  department | salary
  -----------+--------
  IT         | 80000
  IT         | 90000
  Sales      | 60000

Partial 聚合後:
  department | sum_salary | count
  -----------+------------+-------
  IT         | 170000     | 2
  Sales      | 60000      | 1

注意:輸出的是 sum 和 count,而非平均值!

為何需要中間狀態?

如果直接輸出平均值:

錯誤:
  節點 1: IT 平均 85000 (3 個樣本)
  節點 2: IT 平均 88000 (2 個樣本)
  合併: (85000 + 88000) / 2 = 86500  ❌ 錯誤!

正確:
  節點 1: sum=255000, count=3
  節點 2: sum=176000, count=2
  合併: (255000 + 176000) / (3 + 2) = 86200  ✓

Final 模式 - 合併並計算最終結果

輸入(來自多個 Partial 階段):
  department | sum_salary | count
  -----------+------------+-------
  IT         | 255000     | 3      (節點 1)
  IT         | 176000     | 2      (節點 2)
  Sales      | 125000     | 2      (節點 1)

Final 聚合:
  IT:    sum = 431000, count = 5 → avg = 86200
  Sales: sum = 125000, count = 2 → avg = 62500

關鍵差異對比

方面 Partial 模式 Final 模式
輸入 原始數據行 中間狀態
處理方法 update_batch() merge_batch()
輸出 中間狀態 最終聚合值
並行性 高(任意分區) 受限(需按鍵分區)

GROUP BY 的分組邏輯

GroupValues 的設計

DataFusion 使用 Hash Table 管理分組:

pub trait GroupValues {
    /// 將新的分組值加入,返回每個值對應的分組索引
    fn intern(
        &mut self,
        cols: &[ArrayRef],
        group_indices: &mut Vec<usize>,
    ) -> Result<()>;
    
    fn len(&self) -> usize;
    fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
}

intern 方法的作用

// 輸入 batch:
// department | salary
// -----------+--------
// IT         | 80000
// Sales      | 60000
// IT         | 90000

group_values.intern(&[department_array], &mut group_indices)?;

// 執行過程:
// 第 0 行 (IT):    hash查找 → 沒找到 → 創建 group 0 → indices[0] = 0
// 第 1 行 (Sales): hash查找 → 沒找到 → 創建 group 1 → indices[1] = 1
// 第 2 行 (IT):    hash查找 → 找到 group 0 → indices[2] = 0

// 結果: group_indices = [0, 1, 0]

這個 group_indices 會傳給累積器,告訴它每一行應該更新哪個分組。

分組聚合流程

fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
    // 1. 評估 GROUP BY 表達式
    let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
    
    // 2. 評估聚合函數的輸入
    let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
    
    // 3. 確定每行對應的分組索引
    self.group_values.intern(&group_by_values, &mut self.current_group_indices)?;
    
    // 4. 更新每個累積器
    for (accumulator, values) in accumulators.iter_mut().zip(input_values.iter()) {
        accumulator.update_batch(&self.current_group_indices, values)?;
    }
    
    Ok(())
}

Accumulator - 聚合函數的狀態管理器

Accumulator Trait

pub trait Accumulator: Send + Sync + Debug {
    /// 使用新的輸入值更新累積器狀態
    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
    
    /// 返回最終的聚合結果
    fn evaluate(&mut self) -> Result<ScalarValue>;
    
    /// 返回累積器的中間狀態(用於 Partial 模式)
    fn state(&mut self) -> Result<Vec<ScalarValue>>;
    
    /// 合併其他累積器的狀態(用於 Final 模式)
    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
}

實例 1: SUM 累積器

#[derive(Debug)]
pub struct SumAccumulator {
    sum: Option<i64>,
}

impl Accumulator for SumAccumulator {
    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
        let array = as_primitive_array::<Int64Type>(&values[0])?;
        if let Some(batch_sum) = arrow::compute::sum(array) {
            self.sum = Some(self.sum.unwrap_or(0) + batch_sum);
        }
        Ok(())
    }
    
    fn evaluate(&mut self) -> Result<ScalarValue> {
        Ok(ScalarValue::Int64(self.sum))
    }
    
    fn state(&mut self) -> Result<Vec<ScalarValue>> {
        Ok(vec![ScalarValue::Int64(self.sum)])
    }
    
    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
        let array = as_primitive_array::<Int64Type>(&states[0])?;
        if let Some(batch_sum) = arrow::compute::sum(array) {
            self.sum = Some(self.sum.unwrap_or(0) + batch_sum);
        }
        Ok(())
    }
}

實例 2: AVG 累積器

#[derive(Debug)]
pub struct AvgAccumulator {
    sum: Option<f64>,
    count: u64,
}

impl Accumulator for AvgAccumulator {
    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
        let array = as_primitive_array::<Float64Type>(&values[0])?;
        
        if let Some(batch_sum) = arrow::compute::sum(array) {
            self.sum = Some(self.sum.unwrap_or(0.0) + batch_sum);
        }
        
        self.count += (array.len() - array.null_count()) as u64;
        Ok(())
    }
    
    fn evaluate(&mut self) -> Result<ScalarValue> {
        match self.sum {
            Some(sum) if self.count > 0 => {
                Ok(ScalarValue::Float64(Some(sum / self.count as f64)))
            }
            _ => Ok(ScalarValue::Float64(None)),
        }
    }
    
    fn state(&mut self) -> Result<Vec<ScalarValue>> {
        // Partial 模式: 返回 sum 和 count
        Ok(vec![
            ScalarValue::Float64(self.sum),
            ScalarValue::UInt64(Some(self.count)),
        ])
    }
    
    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
        // Final 模式: 合併 sum 和 count
        let sum_array = as_primitive_array::<Float64Type>(&states[0])?;
        let count_array = as_primitive_array::<UInt64Type>(&states[1])?;
        
        if let Some(batch_sum) = arrow::compute::sum(sum_array) {
            self.sum = Some(self.sum.unwrap_or(0.0) + batch_sum);
        }
        if let Some(batch_count) = arrow::compute::sum(count_array) {
            self.count += batch_count;
        }
        
        Ok(())
    }
}

AVG 的完整流程

Partial 階段:
  update_batch([80, 90, 85]) → sum=255, count=3
  state() → [Float64(255), UInt64(3)]

Final 階段:
  merge_batch([255], [3]) → sum=255, count=3
  merge_batch([180], [2]) → sum=435, count=5
  evaluate() → Float64(87.0)

GroupsAccumulator - 批次化優化

對於有 GROUP BY 的查詢,為每個分組創建獨立的 Accumulator 效率低。GroupsAccumulator 同時管理多個分組:

pub trait GroupsAccumulator: Send + Sync {
    /// 更新多個分組的狀態
    fn update_batch(
        &mut self,
        values: &[ArrayRef],
        group_indices: &[usize],  // 關鍵:每個值對應的分組索引
        opt_filter: Option<&BooleanArray>,
        total_num_groups: usize,
    ) -> Result<()>;
    
    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;
}

性能優勢

1000 個分組,每組 10000 行:

方案 1: 1000 個獨立 Accumulator
  - 1000 次虛擬函數調用
  - 處理時間: ~100 ms

方案 2: 單個 GroupsAccumulator
  - 1 次調用,內部向量化處理
  - 處理時間: ~10 ms

性能提升: 10 倍

小結

今天我們深入探討了 DataFusion 的聚合算子:

AggregateExec 的設計

  • 狀態管理:跨 batch 累積狀態,與無狀態算子的本質區別
  • 執行策略:根據 GROUP BY 和 LIMIT 選擇最優路徑
  • 核心組件AggregateModePhysicalGroupByAggregateFunctionExpr

兩階段聚合

  • Partial 模式:本地處理原始數據,產生可合併的中間狀態
  • Final 模式:合併中間狀態,計算最終結果
  • 性能優勢:減少網路傳輸 500 倍以上,充分利用並行計算

GROUP BY 與 Accumulator

  • GroupValues:使用 Hash Table 管理分組,intern 方法分配分組索引
  • Accumulator:四個核心方法 update_batchmerge_batchevaluatestate
  • GroupsAccumulator:批次化處理多個分組,顯著提升性能

兩階段聚合是分散式數據處理的核心模式,不僅用於 DataFusion,也是 Spark、Flink 等系統的基礎設計。

明天我們將探討聚合算子 Part 2 - Hash vs Sort Aggregate,比較不同聚合策略的適用場景和性能權衡。

參考資料

  1. DataFusion AggregateExec 原始碼
  2. Accumulator Trait 定義
  3. GroupedHashAggregateStream 實現
  4. Spark 的聚合優化經驗
  5. Efficient Aggregation on Modern Hardware

上一篇
Day 18: 基礎執行算子 - Projection 和 Filter
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言